package com.lianda.example.hot.sql;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.java.StreamTableEnvironment;

/**
 * 用 Flink Table SQL，
 * 实时获取每小时每个商品下点击量最高的Top3
 * https://blog.csdn.net/wangpei1949/article/details/105471974
 */
public class TableSqlHotMain {
    public static void main(String[] args) throws Exception {
        EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
        StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, settings);

        // 取数据的DDL
        String sourceDDL = "create table source_kafka "
                + "( "
                + "    userID String, "
                + "    eventType String, "
                + "    eventTime String, "
                + "    productID String "
                + ") with ( "
                + "    'connector.type' = 'kafka', "
                + "    'connector.version' = '0.11', "
                + "    'connector.properties.bootstrap.servers' = 'localhost:9092', "
                + "    'connector.properties.zookeeper.connect' = 'localhost:2181',   "
                + "    'connector.topic' = 'source_user_action', "
                + "    'connector.properties.group.id' = 'sql_test', "
                + "    'connector.startup-mode' = 'latest-offset', "
                + "    'format.type' = 'json' "
                + ")";
        tableEnv.sqlUpdate(sourceDDL);


        //放入数据DDL
        String sinkDDL = "create table sink_mysql "
                + "( "
                + "     datetime STRING, "
                + "     productID STRING, "
                + "     userID STRING, "
                + "     clickPV BIGINT "
                + ") with ( "
                + "    'connector.type' = 'jdbc', "
                + "    'connector.url' = 'jdbc:mysql://localhost:3306/test', "
                + "    'connector.table' = 't_product_click_topn', "
                + "    'connector.username' = 'root', "
                + "    'connector.password' = '123456', "
                + "    'connector.write.flush.max-rows' = '50', "
                + "    'connector.write.flush.interval' = '2s', "
                + "    'connector.write.max-retries' = '3' "
                + ")";
        tableEnv.sqlUpdate(sinkDDL);

        //查询
        String querySQL =  ""
                + "INSERT INTO sink_mysql "
                + "SELECT datetime, productID, userID, clickPV "
                + "FROM ( "
                + "    SELECT *, "
                + "    ROW_NUMBER() OVER (PARTITION BY datatime, productID ORDER BY clickPV desc) "
                + "    AS rownum "
                + "    FROM (SELECT SUBSTRING(eventTIme, 1, 13) AS datetime, "
                + "                 productID, "
                + "                 userID, "
                + "                 COUNT(1) AS clickPV "
                + "                 FROM source_kafka "
                + "                 GROUP BY SUBSTRING(eventTIme, 1, 13), productID, userID ) a"
                + ") t WHERE rownum <= 3";
        tableEnv.sqlUpdate(querySQL);
        tableEnv.execute(TableSqlHotMain.class.getSimpleName());
    }
}
